"""Replica set fixture for executing JSTests against."""

import os.path
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from typing import Optional

import bson
import pymongo
import pymongo.errors
import pymongo.write_concern

from buildscripts.resmokelib.extensions import (
    delete_extension_configs,
    find_and_generate_extension_configs,
)
from buildscripts.resmokelib.testing.fixtures import interface


def compare_timestamp(timestamp1, timestamp2):
    """Compare the timestamp object ts part."""
    if timestamp1.time == timestamp2.time:
        if timestamp1.inc < timestamp2.inc:
            return -1
        elif timestamp1.inc > timestamp2.inc:
            return 1
        else:
            return 0
    elif timestamp1.time < timestamp2.time:
        return -1
    else:
        return 1


def compare_optime(optime1, optime2):
    """Compare timestamp object t part."""
    if optime1["t"] > optime2["t"]:
        return 1
    elif optime1["t"] < optime2["t"]:
        return -1
    else:
        return compare_timestamp(optime1["ts"], optime2["ts"])


class ReplicaSetFixture(interface.ReplFixture, interface._DockerComposeInterface):
    """Fixture which provides JSTests with a replica set to run against."""

    AWAIT_SHARDING_INITIALIZATION_TIMEOUT_SECS = 60

    def __init__(
        self,
        logger,
        job_num,
        fixturelib,
        mongod_executable=None,
        mongod_options=None,
        dbpath_prefix=None,
        preserve_dbpath=False,
        num_nodes=2,
        start_initial_sync_node=False,
        electable_initial_sync_node=False,
        write_concern_majority_journal_default=None,
        auth_options=None,
        replset_config_options=None,
        voting_secondaries=True,
        all_nodes_electable=False,
        use_replica_set_connection_string=None,
        linear_chain=False,
        default_read_concern=None,
        default_write_concern=None,
        shard_logging_prefix=None,
        replicaset_logging_prefix=None,
        replset_name=None,
        use_auto_bootstrap_procedure=None,
        initial_sync_uninitialized_fcv=False,
        hide_initial_sync_node_from_conn_string=False,
        launch_mongot=False,
        load_all_extensions=False,
        router_endpoint_for_mongot: Optional[int] = None,
        disagg_base_config=None,
    ):
        """Initialize ReplicaSetFixture."""

        interface.ReplFixture.__init__(
            self, logger, job_num, fixturelib, dbpath_prefix=dbpath_prefix
        )

        self.mongod_executable = mongod_executable
        self.mongod_options = self.fixturelib.make_historic(
            self.fixturelib.default_if_none(mongod_options, {})
        )

        self.load_all_extensions = load_all_extensions or self.config.LOAD_ALL_EXTENSIONS
        if self.load_all_extensions:
            self.loaded_extensions = find_and_generate_extension_configs(
                is_evergreen=self.config.EVERGREEN_TASK_ID,
                logger=self.logger,
                mongod_options=self.mongod_options,
            )

        self.preserve_dbpath = preserve_dbpath
        self.start_initial_sync_node = start_initial_sync_node
        self.electable_initial_sync_node = electable_initial_sync_node
        self.write_concern_majority_journal_default = write_concern_majority_journal_default
        self.auth_options = auth_options
        self.replset_config_options = self.fixturelib.make_historic(
            self.fixturelib.default_if_none(replset_config_options, {})
        )
        self.voting_secondaries = voting_secondaries
        self.all_nodes_electable = all_nodes_electable
        self.use_replica_set_connection_string = use_replica_set_connection_string
        self.default_read_concern = default_read_concern
        self.default_write_concern = default_write_concern
        self.shard_logging_prefix = shard_logging_prefix
        self.replicaset_logging_prefix = replicaset_logging_prefix
        self.num_nodes = num_nodes
        self.replset_name = replset_name
        self.initial_sync_uninitialized_fcv = initial_sync_uninitialized_fcv
        self.hide_initial_sync_node_from_conn_string = hide_initial_sync_node_from_conn_string
        # Used by the enhanced multiversion system to signify multiversion mode.
        # None implies no multiversion run.
        self.fcv = None
        # Used by suites that run search integration tests.
        self.launch_mongot = launch_mongot
        # Used to set --mongoHostAndPort startup option on mongot.
        self.router_endpoint_for_mongot = router_endpoint_for_mongot
        # Use the values given from the command line if they exist for linear_chain and num_nodes.
        linear_chain_option = self.fixturelib.default_if_none(
            self.config.LINEAR_CHAIN, linear_chain
        )
        self.linear_chain = linear_chain_option if linear_chain_option else linear_chain

        # By default, we only use a replica set connection string if all nodes are capable of being
        # elected primary.
        if self.use_replica_set_connection_string is None:
            self.use_replica_set_connection_string = self.all_nodes_electable

        if self.default_write_concern is True:
            self.default_write_concern = self.fixturelib.make_historic(
                {
                    "w": "majority",
                    # Use a "signature" value that won't typically match a value assigned in normal use.
                    # This way the wtimeout set by this override is distinguishable in the server logs.
                    "wtimeout": 5 * 60 * 1000 + 321,  # 300321ms
                }
            )

        # Set the default oplogSize to 511MB.
        self.mongod_options.setdefault("oplogSize", 511)

        self.disagg_base_config = disagg_base_config

        # The dbpath in mongod_options is used as the dbpath prefix for replica set members and
        # takes precedence over other settings. The ShardedClusterFixture uses this parameter to
        # create replica sets and assign their dbpath structure explicitly.
        if "dbpath" in self.mongod_options:
            self._dbpath_prefix = self.mongod_options.pop("dbpath")
        else:
            self._dbpath_prefix = os.path.join(self._dbpath_prefix, self.config.FIXTURE_SUBDIR)

        self.nodes = []
        if "serverless" not in self.mongod_options:
            if not self.replset_name:
                self.replset_name = "rs"
            self.replset_name = self.mongod_options.setdefault("replSet", self.replset_name)
        self.initial_sync_node = None
        self.initial_sync_node_idx = -1
        self.use_auto_bootstrap_procedure = use_auto_bootstrap_procedure
        # The below ports will be set in setup() after the MongoTFixture has been launched.
        self.mongot_port = None
        # mongot_grpc_port is the ingress grpc port on mongot that is configured for the search
        # in community architecture. See setup_mongot_params and MongoDFixture.__init__ for more details.
        self.mongot_grpc_port = None
        # Track the fixture removal [teardown] performed during removeShard testing.
        # This is needed, because we expect the fixture to be in the 'running' state
        # when the evergeen job performs the final teardown. Therefore if the fixture was
        # teared down earlier, it must be skipped during those final checks.
        self.removeshard_teardown_marker = False
        self.removeshard_teardown_mutex = Lock()
        # Track the number of times the fixture has been teared down. This can be used as a restart
        # indicator for hooks that need to reset state upon a restart.
        self.teardown_counter = 0

    def setup(self):
        """Set up the replica set."""

        if self.disagg_base_config:
            # Wait for primary (first node) to get elected first before
            # starting other nodes, otherwise the election can race.
            self.nodes[0].setup()
            self.nodes[0].await_ready()
            self._await_primary()
            for i in range(1, self.num_nodes):
                self.nodes[i].setup()
                self.nodes[i].await_ready()
            return

        start_node = 0
        if self.use_auto_bootstrap_procedure:
            # We need to wait for the first node to finish auto-bootstrapping so that we can
            # get the auto generated replSet name and update the replSet name of the other mongods with it.
            self.nodes[0].setup()
            self.nodes[0].await_ready()
            self._await_primary()  # Wait for writeable primary (this indicates replSet auto-intiiate finished).

            client = interface.build_client(self.nodes[0], self.auth_options)
            res = client.admin.command("hello")

            self.logger.info(
                f"ReplicaSetFixture using auto generated replSet name {res['setName']} instead of {self.replset_name}"
            )

            self.replset_name = res["setName"]
            self.mongod_options.setdefault("replSet", self.replset_name)
            # The first node should not have the --replSet option because it is auto-bootstrapped.
            for i in range(1, self.num_nodes):
                self.nodes[i].mongod_options["replSet"] = self.replset_name

            start_node = 1

        with ThreadPoolExecutor() as executor:
            tasks = []

            # Version-agnostic options for mongod/s can be set here.
            # Version-specific options should be set in get_version_specific_options_for_mongod()
            # to avoid options for old versions being applied to new Replicaset fixtures.
            # If we are using the auto_bootstrap_procedure we don't need to setup the first node again.
            for i in range(start_node, self.num_nodes):
                tasks.append(executor.submit(self.nodes[i].setup))

            if self.initial_sync_node:
                tasks.append(executor.submit(self.initial_sync_node.setup))

            # Wait for the setup of all nodes to complete
            for task in as_completed(tasks):
                task.result()

        if self.initial_sync_node:
            self.initial_sync_node.await_ready()
            if self.initial_sync_uninitialized_fcv:
                self._pause_initial_sync_at_uninitialized_fcv(self.initial_sync_node)

        if not self.use_auto_bootstrap_procedure:
            # We need only to wait to connect to the first node of the replica set because we first
            # initiate it as a single node replica set.
            self.nodes[0].await_ready()

        # Initiate the replica set.
        members = []
        for i, node in enumerate(self.nodes):
            member_info = {"_id": i, "host": node.get_internal_connection_string()}
            if i > 0:
                if not self.all_nodes_electable:
                    member_info["priority"] = 0
                if i >= 7 or not self.voting_secondaries:
                    # Only 7 nodes in a replica set can vote, so the other members must still be
                    # non-voting when this fixture is configured to have voting secondaries.
                    member_info["votes"] = 0
            members.append(member_info)
        if self.initial_sync_node:
            member_config = {
                "_id": self.initial_sync_node_idx,
                "host": self.initial_sync_node.get_internal_connection_string(),
            }
            if not self.electable_initial_sync_node:
                member_config["hidden"] = 1
                member_config["votes"] = 0
                member_config["priority"] = 0

            members.append(member_config)

        repl_config = {"_id": self.replset_name, "protocolVersion": 1}
        client = interface.build_client(self.nodes[0], self.auth_options)

        if (
            client.local.system.replset.count_documents(filter={})
            and not self.use_auto_bootstrap_procedure
        ):
            # Skip initializing the replset if there is an existing configuration.
            # Auto-bootstrapping will automatically create a configuration document but we do not
            # want to skip reconfiguring the replset (which adds the other nodes
            # to the auto-bootstrapped replset).
            self.logger.info("Configuration exists. Skipping initializing the replset.")
            return

        if self.write_concern_majority_journal_default is not None:
            repl_config["writeConcernMajorityJournalDefault"] = (
                self.write_concern_majority_journal_default
            )
        else:
            server_status = client.admin.command({"serverStatus": 1})
            if not server_status["storageEngine"]["persistent"]:
                repl_config["writeConcernMajorityJournalDefault"] = False

        if self.replset_config_options.get("configsvr", False) or (
            self.use_auto_bootstrap_procedure and "shardsvr" not in self.mongod_options
        ):
            repl_config["configsvr"] = True
        if self.replset_config_options.get("settings"):
            replset_settings = self.replset_config_options["settings"]
            repl_config["settings"] = replset_settings

        # Increase the election timeout to 24 hours to prevent spurious elections.
        repl_config.setdefault("settings", {})
        if "electionTimeoutMillis" not in repl_config["settings"]:
            repl_config["settings"]["electionTimeoutMillis"] = 24 * 60 * 60 * 1000

        # Start up a single node replica set then reconfigure to the correct size (if the config
        # contains more than 1 node), so the primary is elected more quickly.
        repl_config["members"] = [members[0]]

        # When this is True, we are running in Antithesis & modify the config to surface more bugs
        if self.config.NOOP_MONGO_D_S_PROCESSES:
            repl_config["settings"]["electionTimeoutMillis"] = 2000
            repl_config["settings"]["chainingAllowed"] = False
            repl_config["settings"]["heartbeatTimeoutSecs"] = 1
            repl_config["settings"]["catchUpTimeoutMillis"] = 0

        if self.use_auto_bootstrap_procedure:
            # Auto-bootstrap already initiates automatically on the first node, but we still need
            # to apply the requested repl_config settings using reconfig.
            self._reconfig_repl_set(client, repl_config)
        else:
            self.logger.info("Issuing replSetInitiate command: %s", repl_config)
            self._initiate_repl_set(client, repl_config)
            self._await_primary()

        if self.fcv is not None:
            # Initiating a replica set with a single node will use "latest" FCV. This will
            # cause IncompatibleServerVersion errors if additional "last-lts" binary version
            # nodes are subsequently added to the set, since such nodes cannot set their FCV to
            # "latest". Therefore, we make sure the primary is "last-lts" FCV before adding in
            # nodes of different binary versions to the replica set.
            client.admin.command(
                {
                    "setFeatureCompatibilityVersion": self.fcv,
                    "fromConfigServer": True,
                }
            )

        if self.nodes[1:]:
            # Wait to connect to each of the secondaries before running the replSetReconfig
            # command.
            for node in self.nodes[1:]:
                node.await_ready()
            # Add in the members one at a time, since non force reconfigs can only add/remove a
            # single voting member at a time.
            for ind in range(2, len(members) + 1):
                self._add_node_to_repl_set(client, repl_config, ind, members)

        self.removeshard_teardown_marker = False

    def _all_mongo_d_s_t(self):
        """Return a list of all `mongo{d,s,t}` `Process` instances in this fixture."""
        nodes = sum([node._all_mongo_d_s_t() for node in self.nodes], [])

        if self.initial_sync_node:
            nodes.extend(self.initial_sync_node._all_mongo_d_s_t())

        return nodes

    def _all_mongots(self):
        """Return a list of all `mongot` `Process` instances in this fixture."""
        return [node.mongot for node in self.nodes]

    def pids(self):
        """:return: all pids owned by this fixture if any."""
        pids = []
        for node in self.nodes:
            pids.extend(node.pids())
        if not pids:
            self.logger.debug("No members running when gathering replicaset fixture pids.")
        return pids

    def _add_node_to_repl_set(self, client, repl_config, member_index, members):
        self.logger.info("Adding in node %d: %s", member_index, members[member_index - 1])
        repl_config["members"] = members[:member_index]
        self._reconfig_repl_set(client, repl_config)

    def _reconfig_repl_set(self, client, repl_config):
        while True:
            try:
                # 'newlyAdded' removal reconfigs could bump the version.
                # Get the current version to be safe.
                curr_version = client.admin.command({"replSetGetConfig": 1})["config"]["version"]
                repl_config["version"] = curr_version + 1

                self.logger.info("Issuing replSetReconfig command: %s", repl_config)
                client.admin.command(
                    {
                        "replSetReconfig": repl_config,
                        "maxTimeMS": self.AWAIT_REPL_TIMEOUT_MINS * 60 * 1000,
                    }
                )
                break
            except pymongo.errors.OperationFailure as err:
                # These error codes may be transient, and so we retry the reconfig with a
                # (potentially) higher config version. We should not receive these codes
                # indefinitely.
                if err.code not in [
                    ReplicaSetFixture._NEW_REPLICA_SET_CONFIGURATION_INCOMPATIBLE,
                    ReplicaSetFixture._CURRENT_CONFIG_NOT_COMMITTED_YET,
                    ReplicaSetFixture._CONFIGURATION_IN_PROGRESS,
                    ReplicaSetFixture._NODE_NOT_FOUND,
                    ReplicaSetFixture._INTERRUPTED_DUE_TO_REPL_STATE_CHANGE,
                    ReplicaSetFixture._INTERRUPTED_DUE_TO_STORAGE_CHANGE,
                ]:
                    msg = (
                        "Operation failure while setting up the " "replica set fixture: {}"
                    ).format(err)
                    self.logger.error(msg)
                    raise self.fixturelib.ServerFailure(msg)

                msg = ("Retrying failed attempt to add new node to fixture: {}").format(err)
                self.logger.error(msg)
                time.sleep(0.1)  # Wait a little bit before trying again.

    def _initiate_repl_set(self, client, repl_config):
        # replSetInitiate (and replSetReconfig) commands can fail with a NodeNotFound error
        # if a heartbeat times out during the quorum check. We retry three times to reduce
        # the chance of failing this way.
        num_initiate_attempts = 3
        for attempt in range(1, num_initiate_attempts + 1):
            try:
                client.admin.command({"replSetInitiate": repl_config})
                break
            except pymongo.errors.OperationFailure as err:
                # Retry on NodeNotFound errors from the "replSetInitiate" command.
                if err.code != ReplicaSetFixture._NODE_NOT_FOUND:
                    msg = (
                        "Operation failure while configuring the " "replica set fixture: {}"
                    ).format(err)
                    self.logger.error(msg)
                    raise self.fixturelib.ServerFailure(msg)

                msg = "replSetInitiate failed attempt {0} of {1} with error: {2}".format(
                    attempt, num_initiate_attempts, err
                )
                self.logger.error(msg)
                if attempt == num_initiate_attempts:
                    msg = "Exceeded number of retries while configuring the replica set fixture"
                    self.logger.error(msg + ".")
                    raise self.fixturelib.ServerFailure(msg)
                time.sleep(5)  # Wait a little bit before trying again.

    def await_last_op_committed(self, timeout_secs=None):
        """Wait for the last majority committed op to be visible."""
        primary_client = interface.build_client(self.get_primary(), self.auth_options)

        primary_optime = get_last_optime(primary_client, self.fixturelib)
        up_to_date_nodes = set()

        def check_rcmaj_optime(client, node):
            """Return True if all nodes have caught up with the primary."""
            res = client.admin.command({"replSetGetStatus": 1})

            if "readConcernMajorityOpTime" not in res["optimes"]:
                # This can be null when the node is starting up.
                return False

            read_concern_majority_optime = res["optimes"]["readConcernMajorityOpTime"]

            if compare_optime(read_concern_majority_optime, primary_optime) >= 0:
                up_to_date_nodes.add(node.port)

            return len(up_to_date_nodes) == len(self.nodes)

        self._await_cmd_all_nodes(
            check_rcmaj_optime, "waiting for last committed optime", timeout_secs
        )

    def await_ready(self):
        """Wait for replica set to be ready."""
        self._await_primary()
        self._await_secondaries()
        self._await_newly_added_removals()
        self._await_stable_recovery_timestamp()
        self._setup_cwrwc_defaults()
        self._await_read_concern_available()
        if self.use_auto_bootstrap_procedure:
            # TODO: Remove this in SERVER-80010.
            self._await_auto_bootstrapped_config_shard()

    def _await_primary(self):
        # Wait for the primary to be elected.
        # Since this method is called at startup we expect the first node to be primary even when
        # self.all_nodes_electable is True.
        primary = self.nodes[0]
        client = primary.mongo_client()
        while True:
            self.logger.info("Waiting for primary on port %d to be elected.", primary.port)
            cmd_result = client.admin.command("isMaster")
            if cmd_result["ismaster"]:
                break
            time.sleep(0.1)  # Wait a little bit before trying again.
        self.logger.info("Primary on port %d successfully elected.", primary.port)

    def _await_secondaries(self):
        # Wait for the secondaries to become available.
        # Since this method is called at startup we expect the nodes 1 to n to be secondaries even
        # when self.all_nodes_electable is True.
        secondaries = self.nodes[1:]
        if self.initial_sync_node and not self.initial_sync_uninitialized_fcv:
            secondaries.append(self.initial_sync_node)

        for secondary in secondaries:
            client = secondary.mongo_client(read_preference=pymongo.ReadPreference.SECONDARY)
            while True:
                self.logger.info(
                    "Waiting for secondary on port %d to become available.", secondary.port
                )
                try:
                    is_secondary = client.admin.command("isMaster")["secondary"]
                    if is_secondary:
                        break
                except pymongo.errors.OperationFailure as err:
                    if err.code != ReplicaSetFixture._INTERRUPTED_DUE_TO_STORAGE_CHANGE:
                        raise
                except pymongo.errors.AutoReconnect:
                    self.logger.info("AutoReconnect exception thrown, retrying...")
                time.sleep(0.1)  # Wait a little bit before trying again.
            self.logger.info("Secondary on port %d is now available.", secondary.port)

    def _await_stable_recovery_timestamp(self):
        """
        Awaits stable recovery timestamps on all nodes in the replica set.

        Performs some writes and then waits for all nodes in this replica set to establish a stable
        recovery timestamp. The writes are necessary to prompt storage engines to quickly establish
        stable recovery timestamps.

        A stable recovery timestamp ensures recoverable rollback is possible, as well as startup
        recovery without re-initial syncing in the case of durable storage engines. By waiting for
        all nodes to report having a stable recovery timestamp, we ensure a degree of stability in
        our tests to run as expected.
        """

        # Since this method is called at startup we expect the first node to be primary even when
        # self.all_nodes_electable is True.
        primary_client = interface.build_client(self.nodes[0], self.auth_options)

        # All nodes must be in primary/secondary state prior to this point. Perform a majority
        # write to ensure there is a committed operation on the set. The commit point will
        # propagate to all members and trigger a stable checkpoint on all persisted storage engines
        # nodes.
        admin = primary_client.get_database(
            "admin", write_concern=pymongo.write_concern.WriteConcern(w="majority")
        )
        admin.command("appendOplogNote", data={"await_stable_recovery_timestamp": 1})

        for node in self.nodes:
            self.logger.info(
                "Waiting for node on port %d to have a stable recovery timestamp.", node.port
            )
            client = interface.build_client(
                node, self.auth_options, read_preference=pymongo.ReadPreference.SECONDARY
            )

            client_admin = client["admin"]

            while True:
                status = client_admin.command("replSetGetStatus")

                # The `lastStableRecoveryTimestamp` field contains a stable timestamp guaranteed to
                # exist on storage engine recovery to a stable timestamp.
                last_stable_recovery_timestamp = status.get("lastStableRecoveryTimestamp", None)

                # A missing `lastStableRecoveryTimestamp` field indicates that the storage
                # engine does not support "recover to a stable timestamp".
                if not last_stable_recovery_timestamp:
                    break

                # A null `lastStableRecoveryTimestamp` indicates that the storage engine supports
                # "recover to a stable timestamp" but does not have a stable recovery timestamp yet.
                if last_stable_recovery_timestamp.time:
                    self.logger.info(
                        "Node on port %d now has a stable timestamp for recovery. Time: %s",
                        node.port,
                        last_stable_recovery_timestamp,
                    )
                    break
                time.sleep(0.1)  # Wait a little bit before trying again.

    def _should_await_newly_added_removals_longer(self, client):
        """
        Return whether the current replica set config has any 'newlyAdded' fields.

        Return true if the current config is not committed.
        """

        get_config_res = client.admin.command(
            {"replSetGetConfig": 1, "commitmentStatus": True, "$_internalIncludeNewlyAdded": True}
        )
        for member in get_config_res["config"]["members"]:
            if "newlyAdded" in member:
                self.logger.info(
                    "Waiting longer for 'newlyAdded' removals, "
                    + "member %d is still 'newlyAdded'",
                    member["_id"],
                )
                return True
        if not get_config_res["commitmentStatus"]:
            self.logger.info(
                "Waiting longer for 'newlyAdded' removals, " + "config is not yet committed"
            )
            return True

        return False

    def _await_newly_added_removals(self):
        """
        Wait for all 'newlyAdded' fields to be removed from the replica set config.

        Additionally, wait for that config to be committed, and for the in-memory
        and on-disk configs to match.
        """

        self.logger.info("Waiting to remove all 'newlyAdded' fields")
        primary = self.get_primary()
        client = interface.build_client(primary, self.auth_options)
        while self._should_await_newly_added_removals_longer(client):
            time.sleep(0.1)  # Wait a little bit before trying again.
        self.logger.info("All 'newlyAdded' fields removed")

    def _setup_cwrwc_defaults(self):
        """Set up the cluster-wide read/write concern defaults."""
        if self.default_read_concern is None and self.default_write_concern is None:
            return
        cmd = {"setDefaultRWConcern": 1}
        if self.default_read_concern is not None:
            cmd["defaultReadConcern"] = self.default_read_concern
        if self.default_write_concern is not None:
            cmd["defaultWriteConcern"] = self.default_write_concern
        primary = self.nodes[0]
        primary.mongo_client().admin.command(cmd)

    # TODO: Remove this in SERVER-80010.
    def _await_auto_bootstrapped_config_shard(self):
        connection_string = self.get_driver_connection_url()
        self.logger.info("Waiting for %s to auto-bootstrap as a config shard...", connection_string)

        deadline = time.time() + ReplicaSetFixture.AWAIT_SHARDING_INITIALIZATION_TIMEOUT_SECS

        def timeout_occurred():
            return deadline - time.time() <= 0.0

        while True:
            client = interface.build_client(self.get_primary(), self.auth_options)
            config_shard_count = client.get_database("config").command(
                {"count": "shards", "query": {"_id": "config"}}
            )

            if config_shard_count["n"] == 1:
                break

            if timeout_occurred():
                port = self.get_primary().port
                raise self.fixturelib.ServerFailure(
                    "mongod on port: {} failed waiting for auto-bootstrapped config shard success after {} seconds".format(
                        port, interface.Fixture.AWAIT_READY_TIMEOUT_SECS
                    )
                )
            time.sleep(0.1)

        self.logger.info(
            "%s successfully auto-bootstrapped as a config shard...", connection_string
        )

    def _check_initial_sync_node_has_uninitialized_fcv(self, initial_sync_node):
        sync_node_conn = initial_sync_node.mongo_client()
        self.logger.info("Checking that initial sync node has uninitialized fcv")
        try:
            fcv = sync_node_conn.admin.command(
                {"getParameter": 1, "featureCompatibilityVersion": 1}
            )

            msg = "Initial sync node should have an uninitialized FCV, but got fcv: " + str(fcv)
            raise self.fixturelib.ServerFailure(msg)
        except pymongo.errors.OperationFailure as err:
            if err.code == 258:  # codeName == 'UnknownFeatureCompatibilityVersion'
                return
            raise

    def _pause_initial_sync_at_uninitialized_fcv(self, initial_sync_node):
        failpointOnCmd = {
            "configureFailPoint": "initialSyncHangAfterResettingFCV",
            "mode": "alwaysOn",
        }
        sync_node_conn = initial_sync_node.mongo_client()
        self.logger.info("Pausing initial sync at failpoint")
        sync_node_conn.admin.command(failpointOnCmd)
        self._check_initial_sync_node_has_uninitialized_fcv(initial_sync_node)

    def _unpause_initial_sync(self, initial_sync_node):
        failpoint_off_cmd = {
            "configureFailPoint": "initialSyncHangAfterResettingFCV",
            "mode": "off",
        }
        self.logger.info("Unpausing initial sync")
        sync_node_conn = initial_sync_node.mongo_client()
        sync_node_conn.admin.command(failpoint_off_cmd)

    def _do_teardown(self, finished=False, mode=None):
        self.logger.info("Stopping all members of the replica set '%s'...", self.replset_name)

        if finished and self.load_all_extensions and self.loaded_extensions:
            delete_extension_configs(self.loaded_extensions, self.logger)

        running_at_start = self.is_running()
        if not running_at_start:
            self.logger.info(
                "All members of the replica set were expected to be running, " "but weren't."
            )

        teardown_handler = interface.FixtureTeardownHandler(self.logger)

        if self.initial_sync_node:
            if self.initial_sync_uninitialized_fcv:
                self._check_initial_sync_node_has_uninitialized_fcv(self.initial_sync_node)
                self._unpause_initial_sync(self.initial_sync_node)
            teardown_handler.teardown(self.initial_sync_node, "initial sync node", mode=mode)

        with ThreadPoolExecutor() as executor:
            tasks = []
            # Terminate the secondaries first to reduce noise in the logs.
            for node in reversed(self.nodes):
                tasks.append(
                    executor.submit(
                        teardown_handler.teardown,
                        node,
                        "replica set member on port %d" % node.port,
                        mode,
                    )
                )
            # Wait for all the teardown tasks to complete
            for task in as_completed(tasks):
                task.result()

        if teardown_handler.was_successful():
            self.logger.info("Successfully stopped all members of the replica set.")
            self.teardown_counter += 1
        else:
            self.logger.error("Stopping the replica set fixture failed.")
            raise self.fixturelib.ServerFailure(teardown_handler.get_error_message())

    def is_running(self):
        """Return True if all nodes in the replica set are running."""
        running = all(node.is_running() for node in self.nodes)

        if self.initial_sync_node:
            running = self.initial_sync_node.is_running() and running

        return running

    def get_primary(self, timeout_secs=30):
        """Return the primary from a replica set."""
        if not self.all_nodes_electable:
            # The primary is always the first element of the 'nodes' list because all other members
            # of the replica set are configured with priority=0.
            return self.nodes[0]

        def is_primary(client, node):
            """Return if `node` is master."""
            is_master = client.admin.command("isMaster")["ismaster"]
            if is_master:
                self.logger.info(
                    "The node on port %d is primary of replica set '%s'",
                    node.port,
                    self.replset_name,
                )
                return True
            return False

        return self._await_cmd_all_nodes(is_primary, "waiting for a primary", timeout_secs)

    def _await_cmd_all_nodes(self, fn, msg, timeout_secs=None):
        """Run `fn` on all nodes until it returns a truthy value.

        Return the node for which makes `fn` become truthy.

        Two arguments are passed to fn: the client for a node and
        the MongoDFixture corresponding to that node.
        """

        if timeout_secs is None:
            timeout_secs = self.AWAIT_REPL_TIMEOUT_MINS * 60
        start = time.time()
        clients = {}

        all_nodes = self.nodes.copy()
        if self.electable_initial_sync_node:
            all_nodes.append(self.initial_sync_node)

        while True:
            for node in all_nodes:
                now = time.time()
                if (now - start) >= timeout_secs:
                    msg = "Timed out while {} for replica set '{}'.".format(msg, self.replset_name)
                    self.logger.error(msg)
                    raise self.fixturelib.ServerFailure(msg)

                try:
                    if node.port not in clients:
                        clients[node.port] = interface.build_client(node, self.auth_options)

                    if fn(clients[node.port], node):
                        return node

                except pymongo.errors.AutoReconnect:
                    # AutoReconnect exceptions may occur if the primary stepped down since PyMongo
                    # last contacted it. We'll just try contacting the node again in the next round
                    # of isMaster requests.
                    continue

    def _await_read_concern_available_on_node(self, client):
        retry_time_secs = self.AWAIT_REPL_TIMEOUT_MINS * 60
        retry_start_time = time.time()
        while "$clusterTime" not in client.admin.command("ping"):
            if time.time() - retry_start_time > retry_time_secs:  # Check for timeout.
                raise self.fixturelib.ServerFailure(
                    "$clusterTime did not appear in a command response from the node on port {} in "
                    "{} seconds.".format(client.port, retry_time_secs)
                )
            time.sleep(0.1)  # Wait a little bit before trying again.

    def _await_read_concern_available(self):
        """Await $clusterTime to appear in command responses to satisfy readConcern guarantees."""
        self.logger.info("Waiting for read concern to become available on all nodes")
        for node in self.nodes:
            self.logger.info(
                "Waiting for node on port %d to have readConcern guarantees available.", node.port
            )
            self._await_read_concern_available_on_node(
                interface.build_client(
                    node, self.auth_options, read_preference=pymongo.ReadPreference.SECONDARY
                )
            )
        self.logger.info("Read concern is available on all nodes")

    def stop_primary(self, primary, background_reconfig, should_kill):
        """Stop the primary node method."""
        # Check that the fixture is still running before stepping down or killing the primary.
        # This ensures we still detect some cases in which the fixture has already crashed.
        if not self.is_running():
            raise self.fixturelib.ServerFailure(
                "ReplicaSetFixture {} expected to be running in"
                " ContinuousStepdown, but wasn't.".format(self.replset_name)
            )

        # If we're running with background reconfigs, it's possible to be in a scenario
        # where we kill a necessary voting node (i.e. in a 5 node repl set), only 2 are
        # voting. In this scenario, we want to avoid killing the primary because no
        # secondary can step up.
        if background_reconfig:
            # stagger the kill thread so that it runs a little after the reconfig thread
            time.sleep(1)
            voting_members = self.get_voting_members()

            self.logger.info("Current voting members: %s", voting_members)

            if len(voting_members) <= 3:
                # Do not kill or terminate the primary if we don't have enough voting nodes to
                # elect a new primary.
                return False

        action = "Killing" if should_kill else "Terminating"
        self.logger.info(
            "%s the primary on port %d of replica set '%s'.",
            action,
            primary.port,
            self.replset_name,
        )

        # We send the mongod process the signal to exit but don't immediately wait for it to
        # exit because clean shutdown may take a while and we want to restore write availability
        # as quickly as possible.
        teardown_mode = (
            interface.TeardownMode.KILL if should_kill else interface.TeardownMode.TERMINATE
        )
        primary.mongod.stop(mode=teardown_mode)
        return True

    def change_version_and_restart_node(self, primary, auth_options):
        """
        Select Secondary for stepUp.

        Ensure its version is different to that
        of the old primary; change the version of the Secondary is needed.
        """

        def get_chosen_node_from_replsetstatus(status_member_infos):
            max_optime = None
            chosen_index = None
            # We always select the secondary with highest optime to setup.
            for member_info in status_member_infos:
                if member_info.get("self", False):
                    # Ignore self, which is the old primary and not eligible
                    # to be re-elected in downgrade multiversion cluster.
                    continue
                optime_dict = member_info["optime"]
                if max_optime is None:
                    chosen_index = member_info["_id"]
                    max_optime = optime_dict
                else:
                    if compare_optime(optime_dict, max_optime) > 0:
                        chosen_index = member_info["_id"]
                        max_optime = optime_dict

                if chosen_index is None or max_optime is None:
                    raise self.fixturelib.ServerFailure(
                        "Failed to find a secondary eligible for "
                        f"election; index: {chosen_index}, optime: {max_optime}"
                    )

                return self.nodes[chosen_index]

        primary_client = interface.build_client(primary, auth_options)
        retry_time_secs = self.AWAIT_REPL_TIMEOUT_MINS * 60
        retry_start_time = time.time()

        while True:
            member_infos = primary_client.admin.command({"replSetGetStatus": 1})["members"]
            chosen_node = get_chosen_node_from_replsetstatus(member_infos)

            if chosen_node.change_version_if_needed(primary):
                self.logger.info(
                    "Waiting for the chosen secondary on port %d of replica set '%s' to exit.",
                    chosen_node.port,
                    self.replset_name,
                )

                teardown_mode = interface.TeardownMode.TERMINATE
                chosen_node.mongod.stop(mode=teardown_mode)
                chosen_node.mongod.wait()

                self.logger.info(
                    "Attempting to restart the chosen secondary on port %d of replica set '%s'.",
                    chosen_node.port,
                    self.replset_name,
                )

                chosen_node.setup()
                self.logger.info(interface.create_fixture_table(self))
                chosen_node.await_ready()

            if self.stepup_node(chosen_node, auth_options):
                break

            if time.time() - retry_start_time > retry_time_secs:
                raise self.fixturelib.ServerFailure(
                    "The old primary on port {} of replica set {} did not step up in"
                    " {} seconds.".format(chosen_node.port, self.replset_name, retry_time_secs)
                )

        return chosen_node

    def stepup_node(self, node, auth_options):
        """Try to step up the given node; return whether the attempt was successful."""
        try:
            self.logger.info(
                "Attempting to step up the chosen secondary on port %d of replica set '%s'.",
                node.port,
                self.replset_name,
            )
            client = interface.build_client(node, auth_options)
            client.admin.command("replSetStepUp")
            return True
        except pymongo.errors.OperationFailure:
            # OperationFailure exceptions are expected when the election attempt fails due to
            # not receiving enough votes. This can happen when the 'chosen' secondary's opTime
            # is behind that of other secondaries. We handle this by attempting to elect a
            # different secondary.
            self.logger.info(
                "Failed to step up the secondary on port %d of replica set '%s'.",
                node.port,
                self.replset_name,
            )
            return False
        except pymongo.errors.AutoReconnect:
            # It is possible for a replSetStepUp to fail with AutoReconnect if that node goes
            # into Rollback (which causes it to close any open connections).
            return False

    def restart_node(self, chosen, temporary_flags={}):
        """Restart the new step up node."""
        self.logger.info(
            "Waiting for the old primary on port %d of replica set '%s' to exit.",
            chosen.port,
            self.replset_name,
        )

        exit_code = chosen.mongod.wait()
        # This function is called after stop_primary() which could kill or cleanly shutdown the
        # process. We therefore also allow an exit code of -9.
        if exit_code in (0, -interface.TeardownMode.KILL.value):
            self.logger.info("Successfully stopped the mongod on port {:d}.".format(chosen.port))
        else:
            self.logger.warning(
                "Stopped the mongod on port {:d}. " "Process exited with code {:d}.".format(
                    chosen.port, exit_code
                )
            )
            raise self.fixturelib.ServerFailure(
                "mongod on port {:d} with pid {:d} exited with code {:d}".format(
                    chosen.port, chosen.mongod.pid, exit_code
                )
            )

        self.logger.info(
            "Attempting to restart the old primary on port %d of replica set '%s'.",
            chosen.port,
            self.replset_name,
        )

        # Restart the mongod on the old primary and wait until we can contact it again. Keep the
        # original preserve_dbpath to restore after restarting the mongod.
        original_preserve_dbpath = chosen.preserve_dbpath
        chosen.preserve_dbpath = True
        try:
            chosen.setup(temporary_flags=temporary_flags)
            self.logger.info(interface.create_fixture_table(self))
            chosen.await_ready()
        finally:
            chosen.preserve_dbpath = original_preserve_dbpath

    def get_secondaries(self):
        """Return a list of secondaries from the replica set."""
        primary = self.get_primary()
        return [node for node in self.nodes if node.port != primary.port]

    def get_secondary_indices(self):
        """Return a list of secondary indices from the replica set."""
        primary = self.get_primary()
        return [index for index, node in enumerate(self.nodes) if node.port != primary.port]

    def get_voting_members(self):
        """Return the number of voting nodes in the replica set."""
        primary = self.get_primary()
        client = primary.mongo_client()

        members = client.admin.command({"replSetGetConfig": 1})["config"]["members"]
        voting_members = [member["host"] for member in members if member["votes"] == 1]

        return voting_members

    def get_initial_sync_node(self):
        """Return initial sync node from the replica set."""
        return self.initial_sync_node

    def set_fcv(self, fcv):
        """Set the fcv used by this fixtures."""
        self.fcv = fcv

    def install_mongod(self, mongod):
        """Install a mongod node. Called by a builder."""
        self.nodes.append(mongod)

    def get_options_for_mongod(self, index):
        """Return options that may be passed to a mongod."""
        mongod_options = self.mongod_options.copy()

        mongod_options["dbpath"] = os.path.join(self._dbpath_prefix, "node{}".format(index))
        mongod_options["set_parameters"] = mongod_options.get(
            "set_parameters", self.fixturelib.make_historic({})
        ).copy()

        if index == 0 and self.use_auto_bootstrap_procedure:
            del mongod_options["replSet"]

        if self.linear_chain and index > 0:
            self.mongod_options["set_parameters"]["failpoint.forceSyncSourceCandidate"] = (
                self.fixturelib.make_historic(
                    {
                        "mode": "alwaysOn",
                        "data": {
                            "hostAndPort": self.nodes[index - 1].get_internal_connection_string()
                        },
                    }
                )
            )
        return mongod_options

    def get_logger_for_mongod(self, index):
        """Return a new logging.Logger instance.

        The instance is used as the primary, secondary, or initial sync member of a replica-set.
        """

        if index == self.initial_sync_node_idx:
            node_name = "initsync"
        elif self.all_nodes_electable:
            node_name = "node{}".format(index)
        elif index == 0:
            node_name = "primary"
        else:
            suffix = str(index - 1) if self.num_nodes > 2 else ""
            node_name = "secondary{}".format(suffix)

        if self.shard_logging_prefix is not None:
            node_name = f"{self.shard_logging_prefix}:{node_name}"
            return self.fixturelib.new_fixture_node_logger(
                "ShardedClusterFixture", self.job_num, node_name
            )

        if self.replicaset_logging_prefix is not None:
            node_name = f"{self.replicaset_logging_prefix}:{node_name}"

        return self.fixturelib.new_fixture_node_logger(
            self.__class__.__name__, self.job_num, node_name
        )

    def get_internal_connection_string(self):
        """Return the internal connection string."""
        conn_strs = [node.get_internal_connection_string() for node in self.nodes]

        # ReplicaSetFixture sets initial sync nodes as hidden,
        # which causes a mismatch if the replica set is added to the sharded cluster
        # through addShard, because the replica set's internal connection string normally
        # does include the initial sync node, but the list of hosts in the replica set from
        # running `hello`/`isMaster` does not include it. Setting hide_initial_sync_node_from_conn_string
        # to True force-hides it from the connection string.
        if self.initial_sync_node and not self.hide_initial_sync_node_from_conn_string:
            conn_strs.append(self.initial_sync_node.get_internal_connection_string())
        return self.replset_name + "/" + ",".join(conn_strs)

    def get_node_info(self):
        """Return a list of dicts of NodeInfo objects."""
        output = []
        for node in self.nodes:
            output += node.get_node_info()
        if self.initial_sync_node:
            output += self.initial_sync_node.get_node_info()
        return output

    def get_driver_connection_url(self):
        """Return the driver connection URL."""
        if self.use_replica_set_connection_string:
            # We use a replica set connection string when all nodes are electable because we
            # anticipate the client will want to gracefully handle any failovers.
            conn_strs = [node.get_internal_connection_string() for node in self.nodes]
            if self.initial_sync_node:
                conn_strs.append(self.initial_sync_node.get_internal_connection_string())
            return "mongodb://" + ",".join(conn_strs) + "/?replicaSet=" + self.replset_name
        else:
            # We return a direct connection to the expected primary when only the first node is
            # electable because we want the client to error out if a stepdown occurs.
            return self.nodes[0].get_driver_connection_url()

    def write_historic(self, obj):
        """Convert the obj to a record to track history."""
        self.fixturelib.make_historic(obj)

    def internode_validation(self):
        """
        Perform internode validation on this replica set using extended validate. Compares the 'all' and 'metadata' hashes of each collection.
        """
        self.logger.info("Waiting for all nodes to be caught up")
        primary_client = interface.build_client(self.get_primary(), self.auth_options)

        # Disable the TTL monitor to avoid inconsistent results from TTL deletes occurring between validate calls.
        previous_value = primary_client.admin.command({"getParameter": 1, "ttlMonitorEnabled": 1})
        primary_client.admin.command({"setParameter": 1, "ttlMonitorEnabled": False})

        coll = primary_client["test"]["validate.hook"].with_options(
            write_concern=pymongo.write_concern.WriteConcern(w=len(self.nodes))
        )
        coll.insert_one({"a": 1})
        coll.drop()

        res = primary_client.admin.command({"replSetGetStatus": 1})

        if "appliedOpTime" not in res["optimes"]:
            # This can be null when the node is starting up.
            return False

        clusterTime = res["optimes"]["appliedOpTime"]["ts"]

        self.logger.info("Performing Internode Validation")

        # Collections we exclude from the hash comparisons. This is because these collections can contain different document contents for valid reasons (i.e. implicitly replicated, TTL indexes, updated by background threads, etc)
        excluded_config_collections = [
            "actionlog",
            "analyzeShardKeySplitPoints",
            "cache.collections",
            "image_collection",
            "mongos",
            "rangeDeletions",
            "sampledQueries",
            "sampledQueriesDiff",
            "shards",
            "system.change_collection",
            "system.preimages",
            "system.sessions",
            "transactions",
        ]

        # the 'system.profile' collections are unreplicated and should not be compared.
        excluded_any_db_collections = ["system.profile"]

        base_hashes = {}
        filter = {"type": "collection"}
        for node in self.nodes:
            client = interface.build_client(node, self.auth_options)
            # Skip validating collections for arbiters.
            admin_db = client.get_database("admin")
            ret = admin_db.command("isMaster")
            if "arbiterOnly" in ret and ret["arbiterOnly"]:
                self.logger.info("Skipping collection validation on arbiter")
                continue

            hashes = {}
            something_set = False
            for db_name in client.list_database_names():
                # the 'local' database is unreplicated and should not be compared.
                if db_name == "local":
                    continue
                if db_name not in hashes:
                    hashes[db_name] = {}
                db = client.get_database(db_name)
                for coll in db.list_collections(filter=filter):
                    coll_name = coll["name"]
                    # Skip excluded collections which all live in the 'config' database.
                    if db_name == "config" and coll_name in excluded_config_collections:
                        continue
                    if coll_name in excluded_any_db_collections:
                        continue
                    # TODO SERVER-114904 Skip collections with names that end with a dot.
                    # Even though the server allows their creation, db.get_collection() below
                    # throws an InvalidName error.
                    if coll_name.endswith("."):
                        continue
                    # Skip collections that contain TTL indexes or TTL options.
                    indexes = db.get_collection(coll_name).list_indexes()
                    if any("expireAfterSeconds" in index for index in indexes):
                        continue
                    if "expireAfterSeconds" in coll["options"]:
                        continue

                    validate_cmd = {
                        "validate": coll_name,
                        "collHash": True,
                        "atClusterTime": clusterTime,
                    }
                    ret = db.command(validate_cmd, check=False)
                    if "all" in ret and "metadata" in ret:
                        something_set = True
                        hashes[db_name][coll_name] = {
                            "all": ret["all"],
                            "metadata": ret["metadata"],
                        }
                    elif not self.fcv:
                        # 'all' and 'metadata' should only not exist when in multiversion suite.
                        raise RuntimeError(
                            f"Missing {db_name}.{coll_name} hashes on a node outside of a multiversion suite."
                        )
                    elif something_set:
                        # we have previously gotten a hash for this node, so we should continue to get hashes for it.
                        raise RuntimeError(
                            f"Missing {db_name}.{coll_name} hashes on a node when we previously received hashes on other namespaces."
                        )

            if not base_hashes and something_set:
                base_hashes = hashes
            elif something_set:
                self.logger.info(f"Base Hashes: {base_hashes}")
                self.logger.info(f"Comparing Hashes: {hashes}")
                # Compare the sets of hashes
                for db_name in base_hashes:
                    # No collection hashes for this DB entry, skipping.
                    if not base_hashes[db_name]:
                        continue
                    if db_name not in hashes:
                        raise RuntimeError(
                            f"Missing {db_name} hashes on a node when it was expected to exist."
                        )
                    for coll_name in base_hashes[db_name]:
                        if coll_name not in hashes[db_name]:
                            raise RuntimeError(
                                f"Missing {db_name}.{coll_name} hashes on a node when it was expected to exist."
                            )
                        base_hash = base_hashes[db_name][coll_name]
                        comp_hash = hashes[db_name][coll_name]
                        if base_hash["all"] != comp_hash["all"]:
                            raise RuntimeError(
                                f"all hash difference on {db_name}.{coll_name}. {base_hash['all']} vs {comp_hash['all']}"
                            )
                        # Metadata hashes can be different on multiversion suites due to removed fields from indexes.
                        if base_hash["metadata"] != comp_hash["metadata"] and not self.fcv:
                            raise RuntimeError(
                                f"metadata hash difference on {db_name}.{coll_name}. {base_hash['metadata']} vs {comp_hash['metadata']}"
                            )

        # Reset the TTL monitor to its original value.
        primary_client.admin.command({"setParameter": 1, "ttlMonitorEnabled": previous_value})
        self.logger.info("Internode Validation Successful")


def get_last_optime(client, fixturelib):
    """Get the latest optime.

    This function is derived from _getLastOpTime() in ReplSetTest.
    """
    repl_set_status = client.admin.command({"replSetGetStatus": 1})
    conn_status = [m for m in repl_set_status["members"] if "self" in m][0]
    optime = conn_status["optime"]

    optime_is_empty = False

    if isinstance(optime, bson.Timestamp):  # PV0
        optime_is_empty = optime == bson.Timestamp(0, 0)
    else:  # PV1
        optime_is_empty = optime["ts"] == bson.Timestamp(0, 0) and optime["t"] == -1

    if optime_is_empty:
        raise fixturelib.ServerFailure(
            "Uninitialized opTime being reported by {addr[0]}:{addr[1]}: {repl_set_status}".format(
                addr=client.address, repl_set_status=repl_set_status
            )
        )

    return optime
